Source code for hysop.backend.hardware.pci_ids

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import sys, os, gzip, bz2, urllib.request, urllib.error, urllib.parse, re, warnings, editdistance
from hysop import vprint
from hysop.tools.numpywrappers import npw
from hysop.tools.warning import HysopWarning
from hysop.tools.htypes import first_not_None


def _fmt_name(name):
    """Format vendor and device names for reversed lookup."""
    if name is None:
        return None
    else:
        assert isinstance(name, str)
        return name.strip().lower()


[docs] class PciVendor: _regexp = re.compile(r"([0-9a-f]{4})\s+(.*)") def __init__(self, vendor): """ Class initializes with the raw line from pci.ids """ vendor = vendor.strip() match = re.match(self._regexp, vendor) if not match: msg = "PCI vendor could not match regexp: {}." msg = msg.format(vendor) raise ValueError(msg) else: self.sid = match.group(1) self.id = int(self.sid, 16) self.name = match.group(2) self.devices = {} self.device_names = []
[docs] def find_device(self, device_id, subdevice_id=None): if (device_id is None) or (device_id not in self.devices): return None device = self.devices[device_id] if (subdevice_id is None) or (subdevice_id not in device.subdevices): return device subdevice = device.subdevices[subdevice_id] return subdevice
[docs] def find_device_by_name(self, device_name, subdevice_name=None): device_name = _fmt_name(device_name) subdevice_name = _fmt_name(subdevice_name) if (device_name is None) or (device_name not in self.devices): return None device = self.devices[device_name] if (subdevice_name is None) or (subdevice_name not in device.subdevices): return device subdevice = device.subdevices[subdevice_name] return subdevice
[docs] def find_device_by_distance_to_name(self, device_name): device_name = _fmt_name(device_name) idx = npw.argmin( npw.fromiter( (editdistance.eval(vn, device_name) for vn in self.device_names), dtype=npw.int16, ) ) if isinstance(idx, npw.int64): device_name = self.device_names[idx] else: device_name = self.device_names[idx[0]] return self.devices[device_name]
def _parse_device(self, device): """ Parse a device and adds it to self.devices Input is a raw line from pci.ids Returns the parsed device_id. """ device = PciDevice(device, vendor=self) device_id = device.id device_sid = device.sid device_name = _fmt_name(device.name) if device_id not in self.devices: self.devices[device_id] = device self.devices[device_sid] = device self.devices[device_name] = device self.device_names.append(device_name) return device_id def __str__(self): return self.name
[docs] class PciDevice: _regexp = re.compile(r"([0-9a-f]{4})\s+(.*)") def __init__(self, device, vendor): device = device.strip() match = re.match(self._regexp, device) if not match: msg = "pci device could not match regexp: {}." msg = msg.format(device) raise valueerror(msg) self.vendor = vendor self.sid = match.group(1) self.id = int(self.sid, 16) self.name = match.group(2) self.subdevices = {} def __str__(self): return self.name def _parse_sub_device(self, subdevice): """ parse a subdevice and adds it to self.subdevices. input is a raw line from pci.ids returns the parsed subdevice_id. """ subdevice = PciSubDevice(subdevice, device=self, vendor=self.vendor) subdevice_id = subdevice.subdevice_id subdevice_sid = subdevice.subdevice_sid if subdevice_id not in self.subdevices: self.subdevices[subdevice_id] = subdevice self.subdevices[subdevice_sid] = subdevice return subdevice_id def __str__(self): return self.name
[docs] class PciSubDevice: _regexp = re.compile(r"([0-9a-f]{4})\s+([0-9a-f]{4})\s+(.*)") def __init__(self, subdevice, device, vendor): subdevice = subdevice.strip() match = re.match(self._regexp, subdevice) if not match: msg = "PCI device could not match regexp: {}." msg = msg.format(subdevice) raise ValueError(msg) self.vendor = vendor self.device = device self.subvendor_sid = match.group(1) self.subvendor_id = int(self.subvendor_sid, 16) self.subdevice_sid = match.group(2) self.subdevice_id = int(self.subdevice_sid, 16) self.name = match.group(3) def __str__(self): return self.name
[docs] class PciDeviceClass: _regexp = re.compile(r"C\s+([0-9a-f]{2})\s+(.*)") def __init__(self, device_class): device_class = device_class.strip() match = re.match(self._regexp, device_class) if not match: msg = "Pci device class could not match regexp: {}." msg = msg.format(device_class) raise ValueError(msg) self.sid = match.group(1) self.id = int(self.sid, 16) self.name = match.group(2) self.device_subclasses = {} def _parse_sub_class(self, subclass): """ parse a subdevice and adds it to self.subdevices. input is a raw line from pci.ids returns the parsed subdevice_id. """ subclass = PciDeviceSubClass(subclass, device_class=self) subclass_id = subclass.id subclass_sid = subclass.sid if subclass_id not in self.device_subclasses: self.device_subclasses[subclass_id] = subclass self.device_subclasses[subclass_sid] = subclass return subclass_id def __str__(self): return self.name
[docs] class PciDeviceSubClass: _regexp = re.compile(r"([0-9a-f]{2})\s+(.*)") def __init__(self, device_subclass, device_class): device_subclass = device_subclass.strip() match = re.match(self._regexp, device_subclass) if not match: msg = "Pci device subclass could not match regexp: {}." msg = msg.format(device_subclass) raise ValueError(msg) self.device_class = device_class self.sid = match.group(1) self.id = int(self.sid, 16) self.name = match.group(2) self.programming_interfaces = {} def _parse_programming_interface(self, interface): interface = PciProgrammingInterface( interface, device_subclass=self, device_class=self.device_class ) interface_id = interface.id interface_sid = interface.sid if interface_id not in self.programming_interfaces: self.programming_interfaces[interface_id] = interface self.programming_interfaces[interface_sid] = interface return interface_id def __str__(self): return f"{self.name} ({self.device_class.name})"
[docs] class PciProgrammingInterface: _regexp = re.compile(r"([0-9a-f]{2})\s+(.*)") def __init__(self, interface, device_subclass, device_class): interface = interface.strip() match = re.match(self._regexp, interface) if not match: msg = "Pci device interface could not match regexp: {}." msg = msg.format(interface) raise ValueError(msg) self.device_class = device_class self.device_subclass = device_subclass self.sid = match.group(1) self.id = int(self.sid, 16) self.name = match.group(2) def __str__(self): return self.name
[docs] class PCIIds: """ Class used to parse all pci.ids entries. This file should contains up to date PCI vendors and device ids. The default path that is looked is '/usr/share/hwdata/pci.ids' See http://pciids.sourceforge.net/ to get urls. It can be updated using the command 'update-pciids', or by providing a custom source file or url in constructor. Usage: from hysop.backend.topology import PCIIds pciids = PCIIds(filepath or url) #PCI DEVICES pciids.vendors[vendor_id] .devices[device_id] .subdevices[subdevice_id] #PCI DEVICE CLASSES pciids.device_classes[class_id] .device_subclasses[subclass_id] .programming_interfaces[interface_id] All ids can be given as hexadecimal strings or as integers. """ def __init__(self, path=None, url=None): """ Loads and parse a pci.ids file. The file may be compressed in gzip or bzip2 format. url has priority over path. """ self.vendors = {} self.vendor_names = [] self.device_classes = {} self._parsed = False path = first_not_None(path, "/usr/share/hwdata/pci.ids") if url is not None: self.load_from_url(url=url) elif path is not None: try: self.load_from_path(path=path) except OSError: msg = "'{}' has not been found on your system, default url is used instead." msg = msg.format(path) warnings.warn(HysopWarning(msg)) self.load_from_url(url="http://pciids.sourceforge.net/v2.2/pci.ids") self.vendor_names = npw.asarray(self.vendor_names, dtype=str)
[docs] def load_from_url(self, url): vprint(f"Fetching file from {url}...") try: response = urllib.request.urlopen(url) content = response.read() except urllib.error.HTTPError: msg = "\nFailed to download {}, supply a custom path to provide pci.ids\n" msg = msg.format(url) raise RuntimeError(msg) vprint("Successfully downloaded pci.ids.") self._parse(content)
[docs] def load_from_path(self, path): if not os.path.isfile(path): msg = f"File '{path}' does not exist." raise OSError(msg) if path.endswith(".ids"): with open(path, "ro") as f: content = f.read() elif path.endswith(".ids.gz"): with gzip.open(path, "ro") as f: content = f.read() elif path.endswith(".ids.bz2"): with bz2.BZ2File(path, "r") as f: content = f.read() else: msg = "File '{}' has an unknown extensions, valid ones are {}." msg = msg.format(path, ["*.ids", "*.ids.gz", "*.ids.bz2"]) raise ValueError(msg) self._parse(content)
[docs] def find_vendor(self, vendor_id): assert self._parsed if (vendor_id is None) or (vendor_id not in self.vendors): return None else: return self.vendors[vendor_id]
[docs] def find_vendor_by_name(self, vendor_name): assert self._parsed vendor_name = _fmt_name(vendor_name) if (vendor_name is None) or (vendor_name not in self.vendors): return None else: return self.vendors[vendor_name]
[docs] def find_vendor_by_distance_to_name(self, vendor_name): assert self._parsed vendor_name = _fmt_name(vendor_name) idx = npw.argmin( npw.fromiter( (editdistance.eval(vn, vendor_name) for vn in self.vendor_names), dtype=npw.int16, ) ) vendor_name = self.vendor_names[idx] if not isinstance(vendor_name, str): vendor_name = vendor_name[0] return self.vendors[vendor_name]
[docs] def find_device(self, vendor_id, device_id, subdevice_id=None): vendor = self.find_vendor(vendor_id) if vendor is None: return None return vendor.find_device(device_id, subdevice_id)
[docs] def find_device_by_name(self, vendor_name, device_name, subdevice_name=None): vendor = self.find_vendor_by_name(vendor_name) if vendor is None: return None return vendor.find_device_by_name(device_name, subdevice_name)
[docs] def find_device_by_id(self, id): if isinstance(id, str): id = int(id, 16) if id > 0xFFFFFFFFFFFF: raise ValueError(id) elif id > 0xFFFFFFFF: vendor_id = (id & 0xFFFF00000000) >> (2 * 16) device_id = (id & 0x0000FFFF0000) >> (1 * 16) subdevice_id = (id & 0x00000000FFFF) >> (0 * 16) elif id > 0xFFFF: vendor_id = (id & 0xFFFF0000) >> (1 * 16) device_id = (id & 0x0000FFFF) >> (0 * 16) subdevice_id = None else: vendor_id = (id & 0xFFFF) >> (0 * 4) device_id = None subdevice_id = None return self.find_device(vendor_id, device_id, subdevice_id)
[docs] def find_device_class( self, device_class_id, device_subclass_id=None, programming_interface=None ): assert self._parsed if (device_class_id is None) or (device_class_id not in self.device_classes): return None device_class = self.device_classes[device_class_id] if (device_subclass_id is None) or ( device_subclass_id not in device_class.device_subclasses ): return device_class device_subclass = device_class.device_subclasses[device_subclass_id] if (programming_interface is None) or ( programming_interface not in device_subclass.programming_interfaces ): return device_subclass programming_interface = device_subclass.programming_interfaces[ programming_interface ] return programming_interface
[docs] def find_device_class_by_id(self, id): if isinstance(id, str): id = int(id, 16) if id > 0xFFFFFF: raise ValueError(id) elif id > 0xFFFF: device_class_id = (id & 0xFF0000) >> (2 * 8) device_subclass_id = (id & 0x00FF00) >> (1 * 8) programming_interface_id = (id & 0x0000FF) >> (0 * 8) elif id > 0xFF: device_class_id = (id & 0xFF00) >> (1 * 8) device_subclass_id = (id & 0x00FF) >> (0 * 8) programming_interface_id = None else: device_class_id = (id & 0xFF) >> (8 * 0) device_subclass_id = None programming_interface_id = None return self.find_device_class( device_class_id, device_subclass_id, programming_interface_id )
def _parse(self, content): msg = "Something went wrong during parsing." for line in content.split("\n"): if (line.strip() == "") or (line[0] == "#"): continue elif line.find("\t\t") == 0: if parse_vendor: self.vendors[vendor_id].devices[device_id]._parse_sub_device(line) elif parse_device_class: self.device_classes[device_class_id].device_subclasses[ device_subclass_id ]._parse_programming_interface(line) else: raise RuntimeError(msg) elif line.find("\t") == 0: if parse_vendor: device_id = self.vendors[vendor_id]._parse_device(line) elif parse_device_class: device_subclass_id = self.device_classes[ device_class_id ]._parse_sub_class(line) else: raise RuntimeError(msg) elif line.strip()[0] == "C": device_class_id = self._parse_device_class(line) parse_device_class = True parse_vendor = False else: vendor_id = self._parse_vendor(line) parse_device_class = False parse_vendor = True self._parsed = True def _parse_vendor(self, line): vendor = PciVendor(line) vendor_id = vendor.id vendor_sid = vendor.sid vendor_name = _fmt_name(vendor.name) if vendor_id not in self.vendors: self.vendors[vendor_id] = vendor self.vendors[vendor_sid] = vendor self.vendors[vendor_name] = vendor self.vendor_names.append(vendor_name) return vendor_id def _parse_device_class(self, line): device_class = PciDeviceClass(line) device_class_id = device_class.id device_class_sid = device_class.sid device_class_name = _fmt_name(device_class.name) if device_class_id not in self.device_classes: self.device_classes[device_class_id] = device_class self.device_classes[device_class_sid] = device_class self.device_classes[device_class_name] = device_class return device_class_id